查看原文
其他

集群管理工具KafkaAdminClient——改造

朱小厮 朱小厮的博客 2020-10-17




1前文概述

在上一篇文章《集群管理工具KafkaAdminClient——原理与示例》中讲述了KafkaAdminClient的功能以及相应的原理,但是同时也提出了目前的KafkaAdminClient并没有非常的完善,还有许多功能还需要去丰富,这些功能可以自定义实现,在《如何获取Kafka的消费者详情》一文中介绍了如何获取Kafka的消费详情,其原理是通过Java调用Kafka的Scala代码实现的,如果要使用纯Java的方式实现就需要用到了KafkaAdminClient,另外Scala版的AdminClient也被标注为:“This client is deprecated, and will be replaced by KafkaAdminClient.”,说明官方也推荐使用KafkaAdminClient。不过现在的版本(目前最新1.1.0)并没有提供类似describeConsumerGroup和listGroupOffsets的方法实现,这一点在前文《集群管理工具KafkaAdminClient——原理与示例》也有提及,所以如果要实现获取类似消费者详情的功能,那么就需要自己动手进行改造。

参考Scala版的AdminClient,要实现获取Kafka的消费者详情的功能首先需要实现describeConsumerGroup和listGroupOffsets的方法,其中describeConsumerGroup方法内部还需要一个findCoordinator的方法用来提供消费者对应的coodinator节点,以便提供详细的消费者详情。describeConsumerGroup、listGroupOffsets和findCoordinator这三个方法都将在KafkaAdminClient类里提供自定义实现。KafkaAdminClient和XXXOptions、XXXResult的类都位于org.apache.kafka.clients.admin包下,笔者也将扩展的类也置于其同一包下,不过也进行了一些区分,如下图所示,新加入的XXXOptions、XXXResult类放入extend下,新加入的JavaBean放入model下,然后与具体应用功能对应的放在app下:

首先建立对应的XXXOptions、XXXResult类,就那简单的ListGroupOffsets来说,其ListGroupOffsetsOptions只是继承了AbstractOptions的空实现,而ListGroupOffsetsResult也很简单,提供了一个KafkaFuture的调用,代码参考如下:

public class ListGroupOffsetsResult {
   private final KafkaFutureImpl<Map<TopicPartition, Long>> future;
   public ListGroupOffsetsResult(KafkaFutureImpl<Map<TopicPartition, Long>> future) {
       this.future = future;
   }
   public KafkaFutureImpl<Map<TopicPartition, Long>> values(){
       return this.future;
   }
}

model目录下的ConsumerGroupSummary是所要实现的describeConsumerGroup方法中所要获取的值类型,封装在DescribeConsumerGroupResult 中;ConsumerSummary在describeConsumerGroup方法内部使用,用来封装消费状态,包括consumerId、clientId、host(消费者主机)以及TopicPartition列表,最终被封装进ConsumerGroupSummary中。PartitionAssignmentState是服务于KafkaConsumerGroupService的,用来最后显示消费者详情列表。

KafkaAdminClient的父类是AdminClient(kafka-client中的抽象类),describeConsumerGroup、listGroupOffsets和findCoordinator这三个方法也需要在AdminClient类中做申明,详细参考如下:

public abstract DescribeConsumerGroupResult describeConsumerGroup(final String group,
                                                        final DescribeConsumerGroupOptions options)
;
public DescribeConsumerGroupResult describeConsumerGroup(final String group) {
   return describeConsumerGroup(group, new DescribeConsumerGroupOptions());
}
public abstract FindCoordinatorResult findCoordinator(final String group,
                                                     final FindCoordinatorOptions options)
;
public FindCoordinatorResult findCoordinator(final String group) {
   return findCoordinator(group, new FindCoordinatorOptions());
}
public abstract ListGroupOffsetsResult listGroupOffsets(final String group,
                                                       final ListGroupOffsetsOptions options)
;
public ListGroupOffsetsResult listGroupOffsets(final String group){
   return listGroupOffsets(group, new ListGroupOffsetsOptions());
}

在前面2篇文章《集群管理工具KafkaAdminClient——原理与示例》和《如何获取Kafka的消费者详情》中都详细解释了describeConsumerGroup、listGroupOffsets方法,所以这里不在赘述,具体实现也很简单,可以参考笔者的实现:https://github.com/hiddenzzh/kafka/blob/master/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

最后来讲述一下org.apache.kafka.clients.admin.app包下的KafkaConsumerGroupService,具体代码地址在这里:https://github.com/hiddenzzh/kafka/blob/master/src/main/java/org/apache/kafka/clients/admin/app/KafkaConsumerGroupService.java,其内部通过上面改造的KafkaAdminClient和KafkaConsumer来实现,其内部逻辑和《如何获取Kafka的消费者详情》一文中的KafkaConsumerGroupCustomService一样,这里就不在赘述了。

本篇以及《Kafka的Lag计算误区及正确实现》、《如何获取Kafka的消费者详情》这三篇文章都是围绕如何获取消费者详情来做具体的陈述,回到问题的初衷:kafka.admin.ConsumerGroupCommand.PartitionAssignmentState无法被外部访问,那么真的需要这么复杂的转变过程么,详细请参考下下篇《Scala与Java语言的互操作》。

END

下期预告

Kafka解析之topic创建(3)——合法性验证

往期精彩

集群管理工具KafkaAdminClient——原理与示例

如何获取Kafka的消费者详情

Kafka的Lag计算误区及正确实现

Kafka解析之topic创建(1)

     

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存